Exploring Messaging Systems with Argo Events 您所在的位置:网站首页 argo event Exploring Messaging Systems with Argo Events

Exploring Messaging Systems with Argo Events

2024-06-24 09:07| 来源: 网络整理| 查看: 265

Exploring Messaging Systems with Argo EventsDavid Farr

David Farr

·

Follow

12 min read·Dec 20, 2022

--

Argo

Argo Events is an event-driven workflow automation framework for Kubernetes. It is part of the CNCF and nicely augments Argo Workflows. Check out the GitHub repository for more details.

I consider myself very fortunate to work on the Data Platform team at Intuit. One of the perks of working at Intuit is getting to work closely with the Argo team; the stewards of Argo Workflows, Argo Events, and other highly successful initiatives that together constitute the Argo umbrella of projects — some of the fastest growing CNCF projects.

My primary motivation for writing this post to engage with you — the users of Argo Events and the curious technology enthusiasts. We are always looking for feedback, if the ideas presented here resonate with you please leave a comment or reach out to us on Slack or GitHub.

High Level Argo Events Architecture

My team leverages Argo Events to trigger Argo Workflows when events occur, primarily when other workflows complete. At its core, Argo Events consists of three components: EventSources, EventBuses, and Sensors.

Argo Events architecture from 10,000 feet.EventSource

An EventSource produces proxy events when events in external systems occur. Supported sources include s3, cron, kafka, and many more. When an event occurs, the EventSource produces a proxy as a cloudevent to an EventBus. Each event specifies the event source name as the source and the event name as the subject of the cloudevent. Ordering of events is guaranteed for each (source, subject) pair. That is, messages pertaining to the same event type will be processed downstream in the same order as they are initially produced by the EventSource.

EventBus

An EventBus is a messaging system that stores transitory cloudevents. Currently, Argo Events supports NATS (deprecated) and Jetstream as messaging system technologies. EventSources produce messages to an EventBus and Sensors consume these messages. This decoupling enables multiple actions to be invoked based on a single source of truth.

Sensor

Sensors subscribe to the EventBus and trigger user-defined actions when a trigger condition occurs. Sensors (like EventSources) manifest as Kubernetes deployments that, due to the way they are implemented, cannot be horizontally scaled. If multiple replicas are specified a leader instance processes all events while the other instances are placed on standby. Argo Events uses graft to implement the leader election.

Motivation and Goals

The Data Platform team at Intuit leverages many Sensors to wire up interdependent Argo Workflows. Because each Sensor results in a Kubernetes deployment this has resulted in an unmanageable number of deployments (and transitively, pods) in our cluster. If Sensors could be horizontally scaled this would solve our problem.

Therefore, the goal of our exploration is as follows.

Determine if the current EventBus technology (Jetstream) can be leveraged in a manner to enable horizontal scaling; and if notDetermine alternative EventBus technologies that could be used to achieve horizontal scaling.

In addition to these goals, any proposed solution should ensure the Sensor application satisfies the following properties.

Resiliency; when consumers fail, information should not be lost.High availability; when consumers fail, information should be automatically re-routed to alternate consumers.Per event type ordering; events with the same event type should be processed by the Sensor in the same order as they are produced by the EventSource.Uncoordinated consumers; consumers should not need to share information.Analysis

Before diving into the technology portion of the analysis, we first need to go over Sensors in more detail. At the highest level, a Sensor comprises of dependencies and triggers. Dependencies are the events that triggers depend on, they can be combined together in a trigger condition. Triggers define what actions to take when a trigger condition is satisfied. Argo Events supports multiple actions out-of-the-box including slack, kafka, webhooks, and many more.

Users define their Sensors in namespace-scoped Kubernetes objects. For the purpose of our analysis let’s consider the following Sensor.

apiVersion: argoproj.io/v1alpha1kind: Sensormetadata: name: example-1spec: dependencies: - name: blue - name: yellow - name: red triggers: - name: trigger-1 conditions: blue - name: trigger-2 conditions: blue && yellow - name: trigger-3 conditions: blue && yellow && red

Some required fields have been removed from this Sensor definition for the purpose of clarity. Here, we have a Sensor with three dependencies and three triggers. The first trigger depends solely on the blue event, the second trigger depends on the blue and yellow event, and the third trigger depends on the blue, yellow, and red event.

When a Sensor starts consuming, a handler is invoked one event at a time for each trigger. The state of the trigger is updated to account for the new event and then the trigger condition is evaluated. If the condition is unsatisfied, no further steps are taken. If the condition is satisfied, an action defined on the trigger (omitted in the specification above) is invoked and the state of the trigger is reset.

Jetstream

In the current design, events are consumed by the Sensor from a Jetstream stream. For each trigger, a goroutine is created that maintains independent subscriptions on the stream, one for each dependency the trigger requires. We can visualize the example Sensor above as follows.

Each trigger maintains subscriptions on the stream, one for each required dependency. Trigger processes are always part of the same application instance.

If we want to scale this approach up to multiple application instances, we would need to use a Jetstream queue group. A queue group enables a number of subscribers to receive messages in a round-robin fashion, ensuring messages are processed once by a single instance of the application. This approach has two issues.

Application instances would need to share information regarding event occurrences, violating the coordination principle.Jetstream queue groups do not guarantee ordered processing, violating the ordering guarantees we wish to achieve.

Alternatively, we could scale up our Sensor by divvying up the triggers across instances. However, this approach would still violate our design goals as it would require the instances to coordinate which are responsible for which subset of triggers.

Kafka

To achieve our application goals, let’s consider an alternative EventBus technology — Kafka. Kafka has two features we can leverage in our application design.

Per-partition message orderingTransactionsEach application instance (here we have two) are responsible for a subset of the triggers. The maximum number of instances is equal to the number of triggers defined in the Sensor.

This design utilizes three Kafka topics — the event topic, which can be used by multiple Sensors, and the trigger and action topics, which are specific to a Sensor. Events are consumed from the event topic by a fan out process. These messages are keyed by the (source, subject) pair known to the EventSource, but for our purposes we can imagine these messages have simple keys of {blue, yellow, red}. The fan out process is responsible for publishing messages to the trigger topic, keyed by the trigger name. We can do the fan out in a transaction to ensure exactly once delivery to the trigger topic. The trigger handlers are responsible for publishing a message to the action topic if the trigger condition is satisfied. Crucially, the trigger handlers do not bump the subscription’s trigger topic offset until the condition is satisfied to ensure application resiliency — if the application fails, messages are re-consumed from the trigger topic.

Why not fan out messages in the EventSource?EventSources decouple events from actions. Multiple Sensors can subscribe to events produced by an EventSource which have no knowledge about the Sensors (nor their triggers) that consume from them.

The action topic is introduced to decouple trigger processing from action invocation. When the trigger handler publishes a message to the action topic it becomes free to process new messages. Actions will still be invoked in the correct order thanks to Kafka’s partition ordering guarantee.

This design would allow the Sensor application to scale up to a maximum number of instances equal to the number of triggers in the Sensor definition (assuming the trigger topic has this many partitions). When the scale is less than the number of triggers, at least one instance would be responsible for handling two triggers. The assignment of instances to triggers can be achieved without the instances coordinating amongst themselves as each will be assigned a subset of partitions, and therefore triggers, by Kafka as messages in the trigger topic are keyed by the trigger name.

One issue with this design arises when the number of partitions is less than the number of triggers. To illustrate this, consider a slightly different Sensor definition with disjoint triggers, that is, triggers that do not depend on the same dependencies.

apiVersion: argoproj.io/v1alpha1kind: Sensormetadata: name: example-2spec: dependencies: - name: blue - name: yellow - name: red triggers: - name: trigger-1 conditions: blue && yellow - name: trigger-2 conditions: red

Now, imagine our trigger topic has only a single partition and we consume the following events.

{blue, red}

Our fan out process will publish two messages to the trigger topic and both messages will, necessarily, land on the same partition.

{payload: blue, key: trigger-1, partition: 0, offset: 1}{payload: red, key: trigger-2, partition: 0, offset: 2}

First, a blue event is consumed by the trigger-1 handler. No action will be taken as the trigger condition is not yet satisfied as we’re still waiting for a yellow event.

Next, a red message is consumed by the trigger-2 handler. Unlike the previous trigger, the condition is satisfied and we need to publish a message to the action topic. But should we bump the offset? Let’s walk through two possibilities and consider the behaviour of our application under failure.

Scenario 1: bump the offset for partition 0 to index 2.

In the first scenario if our application fails and is restarted, we begin consuming messages at index 3. The state of the trigger-1 handler is reset, but we have effectively lost the knowledge that the blue event occurred. Our application would not be resilient, violating the first principle our application design.

Scenario 2: maintain the offset for partition 0 at index 0.

In the second scenario if our application fails and is restarted, we begin consuming messages at index 1. We re-consume the blue event and repopulate the state of the trigger-1 handler, so far so good. But then we re-consume the red event and the trigger-2 handler will publish a duplicate message to the action topic, not so good.

The following code outlines possible implementations for both scenarios using the Sarama go client for Kafka. For sake of simplicity, it is assumed there exists an implementation of the Handler interface that implements the methods described below and that the Sensor struct is instantiated with a handler for each trigger.

type Sensor struct { name string consumer sarama.ConsumerGroup producer sarama.AsyncProducer

// Instantiated with a struct conforming to the Handler interface // below for each trigger defined in the Sensor specification. handlers []*Handler}

type Handler interface { // Returns the name of the trigger defined in the Sensor // specification. Name() string

// Updates the state of the handler with the given message. Update(msg *sarama.ConsumerMessage)

// Returns true if the trigger condition is satsfied, otherwise // false. Satisfied() bool

// Returns the smallest offset of all messages the handler has // stored in memory for the given partition. If the handler has no // messages for the partition, returns nil. Offset(partition int) *int

// Returns a message if the trigger condition is satisfied, // otherwise returns nil. Action() *sarama.ProducerMessage

// Resets the state of the handler. Reset()}

func (s *Sensor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case msg :=



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有